我會從文檔中挑幾個重要的出來講,但不會是全部,我會著重一些基本的中間操作,異常,取消,dispatcher
文檔
doc
首先,之前看過的影片分享了RxJava和Flow相同功能操作符的簡化,可以發現真的可以少記很多東西
今天就是要來講講,flow的操作符
(1..3).asFlow()
或是
flow{
emit(1)
emit(2)
emit(3)
}
或是
listOf(1,3,6).asFlow()
或是
flowOf(1,4,6)
建構式其實沒啥好講的,快速帶過
他做為intermediary,我覺得蠻特別的是它可以emit其他值
(1..3).asFlow() // 一个请求流
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> Timber.d(response) }
Making request 0
response 0
Making request 1
response 1
Making request 2
response 2
但有個小小混淆的點
flow{ //producer
emit(0)
emit(1)
emit(2)
}
.transform { request -> //intermediary
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> Timber.d(response) } //consumer
這樣的話會collect幾個results呢?
和上面的範例一樣6個,請記住,儘管你看到了很多emit,但producer和intermediary的emit操作並不會重複
為了方便比較時間,我打delay從100改到1000
before
flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
.collect { response ->
delay(1000)
Timber.d(response.toString())
}
//15:24:16.819
//15:24:18.825
//15:24:20.831
after
flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
.buffer()
.collect { response ->
delay(1000)
Timber.d(response.toString())
}
//15:25:18.267
//15:25:19.275
//15:25:20.282
加了buffer後,collect和emit會在不同的coroutine運作,而中間的emit會透過channel的send在不同的coroutine交流,但這些都不需要我們實作,我們只需要加buffer就好,棒棒
lifecycleScope.launch {
(0..3).asFlow()
.collect { response ->
Timber.d(response.toString())
}
}
由於 collect 是在main thread调用的,那整個flow也會在main thread執行。 这是快速运行或异步代码的理想默认形式,它不关心执行的上下文并且不会阻塞调用者。
儘管他不會阻塞調用者,但我們也會在某些情況下切換dispatcher,例如需要cpu的大量計算,我們可以用flowOn,而不是withContext
這篇有講到會報什麼錯
(1..10).asFlow()
.map { Timber.d( "map ${Thread.currentThread().name}" ) }
.collect {
Timber.d( "collect ${Thread.currentThread().name}" )
}
// map main
// collect main
// map main
// collect main
// map main
// collect main
(1..10).asFlow()
.map { Timber.d( "map ${Thread.currentThread().name}" ) }
.flowOn(Dispatchers.Default)//只影響這行以上的代碼
.collect {
Timber.d( "collect ${Thread.currentThread().name}" )
}
//map DefaultDispatcher-worker-2
// collect main
//map DefaultDispatcher-worker-2
// collect main
//map DefaultDispatcher-worker-2
// collect main
非常重要的異常處理來了,儘管try/ catch能處理collect的Exception,但在emit卻不應使用try catch
Flows must be transparent to exceptions and it is a violation of the exception transparency to emit values in the flow { ... } builder from inside of a try/catch block. This guarantees that a collector throwing an exception can always catch it using try/catch as in the previous example.
簡單說在建構式用try/catch違反flow對異常的公開性,使用catch,可以保證Exception的公開性,並允許封裝異常處理
開發者可以在catch裡面重新throw Exception,用emit傳送值出去,或看你想怎麼處理
(1..10).asFlow()
.catch { } //只能處理這行以上的代碼
.collect { }
那如果在collect時跑出異常怎麼辦?
不怕,我直接借文檔來告訴你,透過將collect的動作搬到onEach,就可以用catch處理了
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()//注意這裡不是lambda了
如果不喜歡這種做法,請記得collect可以用try/catch
val ff = (0..3).asFlow()
try{
ff.collect{}
} catch {
}
從前面的範例大約可以看出有不同的flow建構式,而他們在ensureActive的處理上有些許不同
這個會在每次emit都ensureActive一次
flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
但這個不會
(1..10).asFlow()
意思你在某個條件下想要提早結束flow,第二個不僅會全部跑完,還會在跑完後才告訴你這個flow被取消了
但我們也不需要自己動寫ensureActive,我們只需要加個cancellable()
//跑到結束
(1..5).asFlow().collect {}
//會檢查是否取消
(1..5).asFlow().cancellable().collect {}
可以透過中間計算,並將值約束到單個
val sum = (1..5).asFlow()
.map { it * it } // 数字 1 至 5 的平方
.reduce { a, b ->
println(a)
a + b
} // 求和(末端操作符)
println(sum)
//a 1
//a 5
//a 14
//a 30
//sum 55
其他的其實也很重要,但我能介紹的有限,而且以基本操作已經足夠了,生魚的我認為有空時或是要傭到時,再去了了解就好